package com.spring.batch.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * @author liuzhiqiang
 */
@Slf4j
@Component
public class DataBatchJob {

    /**
     * Job构建工厂，用于构建Job
     */
    @Autowired(required = false)
    private JobBuilderFactory jobBuilderFactory;
    /**
     * Step构建工厂，用于构建Step
     */
    @Autowired(required = false)
    private StepBuilderFactory stepBuilderFactory;
    /**
     * 自定义的简单Job监听器
     */
    @Autowired
    private JobListener jobListener;

    public Job dataHandleJob() {
        return jobBuilderFactory.get("dataHandleJob").
                incrementer(new RunIdIncrementer())
                .start(handleDataStep())
                .listener(jobListener).build();
    }

    /**
     * 一个简单基础的Step主要分为三个部分
     * ItemReader : 用于读取数据
     * ItemProcessor : 用于处理数据
     * ItemWriter : 用于写数据
     *
     * @return
     */
    private Step handleDataStep() {
        return stepBuilderFactory.get("getData")
                // <输入对象, 输出对象>  chunk通俗的讲类似于SQL的commit; 这里表示处理(processor)100条后写入(writer)一次
                .<User, User>chunk(100)
                // 捕捉到异常就重试,重试100次还是异常,JOB就停止并标志失败
                .faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class)
                // 指定ItemReader对象
                .reader(getDataReader())
                // 指定ItemProcessor对象
                .processor(getDataProcessor())
                // 指定ItemWriter对象
                .writer(getDataWriter()).
                        build();
    }

    private ItemWriter<? super User> getDataWriter() {
        return list -> {
            log.info("消费数据:{}条", list.size());
        };
    }

    private ItemProcessor<? super User, User> getDataProcessor() {
        return user -> {
            if (user.getAge() > 80) {
                user.setUsername("老年");
            } else if (user.getAge() > 60) {
                user.setUsername("中老年");
            } else if (user.getAge() > 40) {
                user.setUsername("中年");
            } else {
                user.setUsername("青年");
            }
            return user;
        };
    }

    private ItemReader<User> getDataReader() {
        List<User> reader = new ArrayList();
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (int i1 = 0; i1 < 100; i1++) {
                reader.add(new User());
            }
            log.info("追加100条数据");
        }

        return new ListItemReader<>(reader);
    }
}
